/**
 * 
 */
package com.fnic.pearl.scheduler.mod;

import java.io.IOException;
import java.lang.reflect.Type;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.List;

import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.ParseException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
import org.apache.http.util.EntityUtils;
import org.apache.log4j.Logger;

import com.fnic.pearl.scheduler.SchedulerConfig;
import com.fnic.pearl.scheduler.constant.CondHeaderField;
import com.fnic.pearl.scheduler.constant.PearlErrorCode;
import com.fnic.pearl.scheduler.constant.SchedulerUrl;
import com.fnic.pearl.scheduler.constant.TestTaskStatus;
import com.fnic.pearl.scheduler.dao.SchedulerStore;
import com.fnic.pearl.scheduler.model.ProbeTaskStatus;
import com.fnic.pearl.scheduler.model.SessionKey;
import com.fnic.pearl.scheduler.model.TaskStats;
import com.fnic.pearl.scheduler.model.TestTask;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSyntaxException;
import com.google.gson.reflect.TypeToken;

/**
 * 心跳处理在单独的线程中进行处理，负责：
 * 
 * - 与指定 HTTP 服务端的连接； - 与该 HTTP 服务端心跳包的维护； - 解析心跳响应，并将响应内容提交至业务成的队列中
 * 
 * @author HuHaiyang
 * @date 2013年7月16日
 */
public class HeartBeatRunnable implements Runnable {

	private static Logger LOG = Logger.getLogger(HeartBeatRunnable.class);

	private String serverIp = null;

	private short serverPort = 0;

	private String hbUrl = null;

	private HttpClient client = new DefaultHttpClient();

	private final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

	private Gson gson = new GsonBuilder().excludeFieldsWithoutExposeAnnotation()
			.registerTypeAdapter(Date.class, new JsonDeserializer<Date>() {

				public Date deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context)
						throws JsonParseException {
					if (!(json instanceof JsonPrimitive)) {
						throw new JsonParseException("deserialize Date failed, date should be string!");
					}

					if ("".equals(json.getAsString().trim())) {
						return null;
					}

					try {
						Date date = dateFormat.parse(json.getAsString());
						return date;
					} catch (java.text.ParseException pe) {
						throw new JsonParseException("Unparseable date: " + json.getAsString());
					}
				}
			}).create();

	/**
	 * 在创建实例时就需要指定服务端的地址与端口号
	 * 
	 * @param serverIp
	 *            服务器端地址
	 * @param serverPort
	 *            服务端口号
	 */
	public HeartBeatRunnable(String serverIp, short serverPort) {
		super();
		this.serverIp = serverIp;
		this.serverPort = serverPort;
		this.hbUrl = new StringBuilder().append("http://").append(serverIp).append(":").append(serverPort)
				.append(SchedulerUrl.HEARTBEAT_MANAGER).append("?").append(CondHeaderField.COND_SCHEDULER_ID)
				.append("=").append(SchedulerConfig.getInstance().getSchedulerId()).toString();

		HttpParams params = client.getParams();
		HttpConnectionParams.setConnectionTimeout(params,
				(int) SchedulerConfig.getInstance().getManagerTimeout() * 1000);
		HttpConnectionParams.setSoTimeout(params, (int) SchedulerConfig.getInstance().getManagerTimeout() * 1000);
	}

	/**
	 * @see java.lang.Runnable#run()
	 */
	public void run() {
		while (true) {
			try {
				// 执行心跳
				LOG.info("调度器心跳 |" + AuthManager.getInstance().isLoginManager() + "|剩余尝试次数|"
						+ AuthManager.getInstance().getLimit() + "|" + hbUrl);
				doHeartBeat();
			} catch (Exception e) {
				LOG.error("心跳异常|" + e.getMessage(), e);
			}
			sleep();
		}
	}

	/**
	 * 执行心跳
	 * 
	 *
	 */
	private void doHeartBeat() {
		if (!AuthManager.getInstance().loginManager(serverIp, serverPort)) {
			return;
		}
		HttpResponse response = null;
		HttpPost post = new HttpPost(hbUrl);
		try {
			String sessionId = AuthManager.getInstance().getSessionIdWithManager();
			post.addHeader(CondHeaderField.HEADER_COOKIE, CondHeaderField.SESSION_ID + "=" + sessionId);
			response = client.execute(post);
			if (response.getStatusLine().getStatusCode() != 200) {
				return;
			}
			// 执行心跳任务
			HttpEntity entity = response.getEntity();
			// if (entity != null && entity.getContentLength() > 0) {
			if (entity != null) {
				doResponse(entity, response);
			}
		} catch (Exception e) {
			// 计数器减
			AuthManager.getInstance().countDown();
			LOG.error("与管理节点心跳出错|" + e.getMessage());
			return;
		} finally {
			post.releaseConnection();
		}
		// 心跳恢复正常
		AuthManager.getInstance().restore();
	}

	/**
	 * 执行心跳中返回的任务
	 * 
	 * @param entity
	 * @param response
	 *
	 */
	private void doResponse(HttpEntity entity, HttpResponse response) {
		Header[] headersType = response.getHeaders(CondHeaderField.HB_BODY_TYPE);
		try {
			String body = EntityUtils.toString(entity);
			if (headersType.length != 1) {
				return;
			} else {

				String ht = headersType[0].getValue();
				if (ht.equals(CondHeaderField.HB_BTYPE_TESTASK)) {
					processTestTask(body);
				} else if (ht.equals(CondHeaderField.HB_BTYPE_SESSION_KEY)) {
					processSessionKey(body);
				} else if (ht.equals(CondHeaderField.HB_BTYPE_VER_UPD)) {
					LOG.info("versionUpdate: " + body);
				} else if (ht.equals("SchduleNotLogin")) {
					// 控制器重启，调度器重新login
					AuthManager.getInstance().controllRestart(serverIp, serverPort);
				} else {
					LOG.warn("unsupported hb type: " + ht);
				}
			}
		} catch (ParseException e) {
			LOG.error("parse exception - " + e.getMessage());
		} catch (IOException e) {
			LOG.error("parse exception - " + e.getMessage());
		}

	}

	/**
	 * 转换为 测量任务记录，并做基本校验，以及加入初始化状态的任务队列
	 */
	private boolean processTestTask(String body) {
		try {
			List<TestTask> tts = gson.fromJson(body, new TypeToken<List<TestTask>>() {
			}.getType());
			for (TestTask tt : tts) {
				int errorCode = PearlErrorCode.SUCCESS;
				if (tt == null) {
					LOG.warn("invalid testask, is null!");
				} else if (tt.getTaskId() <= 0) {
					LOG.warn("invalid testask, taskId [" + tt.getTaskId() + "] <= 0");
					errorCode = PearlErrorCode.SCHD_ERR_SVC_TASK_ID_INVALID;
				} else if (!(tt.getProbe() != null && tt.getProbe() > 0)) {
					LOG.warn("invalid testask, Probe or serial is null or less than 0");
					errorCode = PearlErrorCode.SCHD_ERR_SVC_PROBE_ID_INVALID;
				} else if (tt.getToolName() == null || tt.getToolName().length() <= 0) {
					LOG.warn("invalid testask, Tool is null or less than 0");
					errorCode = PearlErrorCode.SCHD_ERR_SVC_TOOL_INVALID;
				} else if (tt.getRun() != TestTask.RUN_IMMEDIATE && tt.getRun() != TestTask.RUN_TIMING) {
					LOG.warn("invalid testask Run [" + tt.getRun() + "]");
					errorCode = PearlErrorCode.SCHD_ERR_SVC_RUN_INVALID;
				} else if (tt.getEndTime() != null) {
					final Calendar now = Calendar.getInstance();
					Calendar endCal = Calendar.getInstance();
					endCal.setTime(tt.getEndTime());
					if (now.compareTo(endCal) >= 0) {
						LOG.info("invalid testask, now >= end");
						errorCode = PearlErrorCode.SCHD_ERR_SVC_INVALID_END_TIME;
					}
				}

				if (errorCode != PearlErrorCode.SUCCESS) {
					ProbeTaskStatus pts = new ProbeTaskStatus(tt.getProbe(), tt.getTaskId(),
							TestTaskStatus.S_FAILED_TASK_PARSE);
					pts.setExecNum(tt.getExecNum());
					pts.setErrorNo(errorCode);
					SchedulerStore.getInstance().putTestTaskStatus(pts);
					continue;
				}

				LOG.info("recv testask - " + tt);
				tt.setStatus(TestTaskStatus.S_NEW_TASK);
				TaskStats.getInstance().incrRecvTaskNum();
				SchedulerStore.getInstance().putInitTestTask(tt);
			}
		} catch (ParseException e) {
			LOG.warn("Parse entity exception: " + e.getMessage());
			return false;
		} catch (JsonSyntaxException jse) {
			LOG.warn("test task json syntax parse: " + jse.getMessage());
			return false;
		}

		return true;
	}

	/**
	 * 解析 SessionKey 记录，并加入至待下发至 probe 的 session key 队列中
	 */
	public boolean processSessionKey(String body) {
		try {
			SessionKey sessionKey = gson.fromJson(body, SessionKey.class);
			ProbeSessionKeyManager.getInstance().putWissue(sessionKey.getProbeId(), sessionKey);
			LOG.info("recv SessionKey - " + body + " | " + sessionKey);
		} catch (ParseException e) {
			LOG.warn("Parse sessionKey exception: " + e.getMessage());
			return false;
		} catch (JsonSyntaxException jse) {
			LOG.warn("sessionKey json syntax parse: " + jse.getMessage());
			return false;
		}
		return true;
	}

	/**
	 * 睡眠
	 * 
	 *
	 */
	private void sleep() {
		try {
			Thread.sleep(SchedulerConfig.getInstance().getManagerTimeout() * 1000);
		} catch (InterruptedException e1) {
			LOG.warn("login manager failed - " + e1.getMessage());
		}
	}
}
